package com.atguigu.flink.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/13
 *
 类型：  时间，计数
 特征：  滚动，滑动，会话
 计算并行度： 全局，keyed

 演示： keyed滚动计数窗口

 */
public class Demo2_KeyedCountWindow
{
    public static void main(String[] args) {
        
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(2);

        WindowedStream<WaterSensor, String, GlobalWindow> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .keyBy(WaterSensor::getId)
            // 滚动:size = 3(窗口中最多存3个数据)， slide = 3(每3个触发一次窗口运行)  每3个算一次，一次算3个
            .countWindow(3l);


        ds
            /*
                   全局窗口: ProcessAllWindowFunction<IN, OUT, W extends Window>
                   keyed窗口: ProcessWindowFunction<IN, OUT, KEY, W extends Window>
                            IN: 上游发送的类型，窗口中的元素类型。固定的
                            OUT： 你要指定，输出的类型
                           Window W： 当前是计数窗口还是时间窗口

             */
            .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>()
            {
                /*
                    String key: key

                 */
                @Override
                public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    System.out.println("Demo2_KeyedCountWindow.process");
                    out.collect(key + "的窗口被计算了:" + MyUtil.parseToList(elements));
                }
            })
            .setParallelism(10)  //keyed窗口计算，可以设置并行度
            .print();

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
